昨天完成Cassandra的簡易安裝,並在cqlsh中走過基本的CRUD
、UDT
以及一些稍微特別的操作馬拉松後,今天當然要進一步用Spark與Cassandra互動阿!!還記得Cassandra沒有join
操作嗎?疑..剛好Spark有耶...怎麼那麼巧XDDD。咳..回到正題,我們還是趕快看看怎麼透過Spark連接Cassandra吧!
先在sbt的build.sbt
加入相依性敘述
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.0-M3"
resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven"
因為我們的cassandra與spark版本都相當新,可以發現我們的connector用的是2.0.0-M3
版本,也就是Spark2.0與Cassandra3.X版的連接器。
而整個練習環境的簡易build.sbt檔案如下:
name := "Spark"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0"
libraryDependencies += "log4j" % "log4j" % "1.2.16"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.0.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8-assembly_2.11" % "2.0.0"
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.0-M3"
resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven"
之後若要將Code佈署到production,最好還是搭配assembly等功能。
先建個測試用的test keyspace
與kv table
,然後透過Spark接Cassandra讀寫:
CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
CREATE TABLE test.kv(key text PRIMARY KEY, value int);
寫個兩筆資料進去kv表,然後確認一下:
cqlsh:test> INSERT INTO test.kv(key, value) VALUES ('key1', 1);
cqlsh:test> INSERT INTO test.kv(key, value) VALUES ('key2', 2);
cqlsh:test> SELECT * FROM kv;
key | value
------+-------
key1 | 1
key2 | 2
OK,回到程式段,import Spark中連接Cassandra所需的函式庫
import com.datastax.spark.connector._
都在com.datastax.spark.connector
之下,這邊就先簡單的用_
全包了
[Snippet.65] spark-cassandra Entry Point
而Spark使用Cassandra的進入點就是sc.cassandraTable
啦,簡單一行就可以接進table了。
val rdd = sc.cassandraTable("test", "kv")
println(rdd.count) ①
println(rdd.first) ②
println(rdd.map(_.getInt("value")).sum)③
//Output:
//2
//CassandraRow{key: key1, value: 1}
//3.0
①rdd的count直接就可以算出筆數啦,類似RDB中count的效果
②也可以取出第1筆
③先將rdd map到value欄位,然後加總,類似RDB中sum的效果
[Snippet.66] spark-cassandra saveToCassandra
那資料要怎麼寫入Cassandra的一張表內勒?
val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4))) ①
collection.saveToCassandra("test", "kv", SomeColumns("key", "value")) ②
①建立一個簡單的pairRDD,模擬kv
中的兩個欄位
②透過saveToCassandra
寫入test
keyspace中的kv
表,且透過函式SomeColumns指定寫入順序分別對應欄位key
與value
。
那要怎麼將rdd寫成新的另外一張cassandra表呢?可以用saveAsCassandraTable
跟saveAsCassandraTableEx
。saveAsCassandraTable比較簡單,而saveAsCassandraTableEx可以定義控制新表的細節。
用cqlsh check(或用讀取程式重跑一次檢視是否寫入成功):
cqlsh:test> SELECT * FROM kv;
key | value
------+-------
key1 | 1
key4 | 4
key3 | 3
key2 | 2
完整的讀寫Snippet:
package SparkIronMan
import com.datastax.spark.connector._
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by joechh on 2017/1/13.
*/
object Day29_Cassandra extends App {
val conf = new SparkConf()
.set("spark.cassandra.connection.host", "127.0.0.1")
.setMaster("local[2]")
.setAppName("cassandra")
val sc = new SparkContext(conf)
val rdd = sc.cassandraTable("test", "kv")
println(rdd.count)
println(rdd.first)
println(rdd.map(_.getInt("value")).sum)
rdd.foreach(println)
val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
collection.saveToCassandra("test", "kv", SomeColumns("key", "value"))
}
五分鐘速成班結束,進行下一段吧:
[Snippet.67] conf與sc建立方式 for cassandra
如果cassandra有做權限管理(需要帳號密碼)或是想要連指定位置的節點,以下設定會比較泛用:
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "192.168.123.10")
.set("spark.cassandra.auth.username", "cassandra")
.set("spark.cassandra.auth.password", "cassandra")
val sc = new SparkContext("spark://192.168.123.10:7077", "test", conf)
要特別注意的是這邊的連線節點也可用List(Address1:Port1,Address2:Port2,..)串起來,但這邊只是描述初次連線節點,當Client連上叢集之後,會擷取整個叢集的可用節點資訊
,這意味著程式運行中即便初次連線節點全都失效了
若叢集中尚有可用節點,那服務還是可以繼續運行不會中斷。而這種初始連線節點的概念在許多分散式環境中隨處可見,例如zookeeper
、ElasticSearch
、Kafka
等都有類似的設定
而sc.cassandraTable("keyspace name", "table name")
回傳值若沒有轉型,則會是CassandraRDD[CassandraRow]
型別,這個物件之後可以很方便的轉換成其他的物件(Object Mapping)
在test keyspace下開另外一張測試小表並塞入測試值:
cqlsh:test> CREATE TABLE test.words (word text PRIMARY KEY, count int);
cqlsh:test> INSERT INTO test.words (word, count) VALUES ('foo', 20);
cqlsh:test> INSERT INTO test.words (word, count) VALUES ('bar', 20);
若要讀值知道該怎麼做了吧?
val rdd = sc.cassandraTable("test", "words")
rdd.toArray.foreach(println)
//Output
//CassandraRow{word: bar, count: 20}
//CassandraRow{word: foo, count: 20}
這時候的型別都是CassandraRow
,因為我們沒有特別指定,若我們有預期的型別了,該怎麼做?
[Snippet.67] 存取特定型別欄位值
如果知道某個欄位的型別,那要怎麼用對應型別取出呢?就用get
系列函數吧!
val firstRow = rdd.first
firstRow.getInt("count") // 20
firstRow.getLong("count") // 20L
除了透過getInt、getLong以外,也可以自己用萬用get
搭配cast轉型:
firstRow.get[Int]("count") // 20
firstRow.get[Long]("count") // 20L
firstRow.get[BigInt]("count") // BigInt(20)
若擔心值有可能是null造成nullPointException的話,就再wrapper一層Option吧:
firstRow.getIntOption("count") // Some(20)
firstRow.get[Option[Int]]("count").getOrElse(0) // Some(20)
[Snippet.68] 存取集合型別欄位
看過一般欄位後,存取集合物件欄位勒?
再開另外一張表來玩玩~
cqlsh:test> CREATE TABLE test.users (username text PRIMARY KEY, emails SET<text>);
cqlsh:test> INSERT INTO test.users (username, emails)
VALUES ('joe', {'joechh@gmail.com', 'joeAnothermail@gmail.com'});
cqlsh:test> SELECT * from users ;
username | emails
----------+--------------------------------------------------
joe | {'joeAnothermail@gmail.com', 'joechh@gmail.com'}
開好了,開始拿set
中的值囉~
val firstSetrow = sc.cassandraTable("test", "users").first ①
println(firstSetrow) ②
println(firstSetrow.getList[String]("emails")) ③
println(firstSetrow.get[List[String]]("emails")) ④
println(firstSetrow.get[String]("emails")) ⑤
//Output:
//CassandraRow{username: joe, emails: {joeAnothermail@gmail.com,joechh@gmail.com}}
//Vector(joeAnothermail@gmail.com, joechh@gmail.com)
//List(joeAnothermail@gmail.com, joechh@gmail.com)
//{joeAnothermail@gmail.com,joechh@gmail.com}
①取出一筆紀錄
②沒有轉型,所以輸出為CassandraRow型別
③CassandraRow的getList
是給你集合的上層類別Vector型別唷!
④若想拿到List的話,要自己用get然後轉型成String的List
⑤也可以把整筆資料轉成String
如果有UDT
勒?假設有個UDT如下格式:
CREATE TYPE test.address (city text, street text, number int);
CREATE TABLE test.companies (name text PRIMARY KEY, address FROZEN<address>);
則可以透過getUDTValue
取出UDT物件,然後再用一般get方式即可啦,例如:
val address: UDTValue = row.getUDTValue("address")
val city = address.getString("city")
val street = address.getString("street")
val number = address.getInt("number")
這邊就不用實例示範了,有興趣的可以用昨天的UDT來玩玩
終於到了一直念茲在茲的Join with Spark終於到了!(其實只是我愛murmur..XD)
先建兩個有共同欄位的小表來玩Join!規劃如下:
customer_info
:(cust_id text PRIMARY KEY, name text)shopping_info
:(cust_id text PRIMARY KEY, date date, item text, price double)cqlsh:test> create table customer_info(cust_id text PRIMARY KEY, name text);
cqlsh:test> INSERT INTO customer_info (cust_id , name ) VALUES ( '1','joe');
cqlsh:test> INSERT INTO customer_info (cust_id , name ) VALUES ( '2','doris');
cqlsh:test> SELECT * from customer_info ;
cust_id | name
---------+-------
2 | doris
1 | joe
(2 rows)
OK,第一張表沒有問題,建下一張然後放個3筆資料進去:
cqlsh:test> create table shopping_info(cust_id text PRIMARY KEY, date date, item text, price double) ;
cqlsh:test> INSERT INTO shopping_info (cust_id , date , item , price ) VALUES ( '1','2017-01-02','book',30.5);
cqlsh:test> INSERT INTO shopping_info (cust_id , date , item , price ) VALUES ( '2','2017-01-02','toys',40.87);
cqlsh:test> INSERT INTO shopping_info (cust_id , date , item , price ) VALUES ( '2','2017-01-05','cake',10);
cqlsh:test> SELECT * from shopping_info ;
cust_id | date | item | price
---------+------------+------+-------
2 | 2017-01-05 | cake | 10
1 | 2017-01-02 | book | 30.5
疑?...怎麼剩下兩筆?
原來是Data Modeling的方式錯了,只用cust_id當key的話,後面兩筆(cust_id=2)的會疊在一起只剩一筆,那該如何在Cassanra中建消費紀錄表勒?
CREATE TABLE test.shopping_info (
cust_id text,
date date,
shopping_id uuid,
item text,
price double,
PRIMARY KEY ((cust_id), date, shopping_id) ①
);
①主鍵的成份變成((cust_id), date, shopping_id)
,在小括號內的(cust_id)
是所謂的分區鍵(partition key)
,而date與shopping_id
則是clustering column。
簡單解釋這樣的主鍵會讓同一個客戶cust_id
物理上(磁碟上)排列在一起!成為一個分區。而clustering column會影響排序的順序,而三個欄位的組合則提供了唯一性(這點跟RDB一樣)
先前的key只有cust_id
無法滿足唯一性,所以才讓值疊在一起。新表的shopping_id
型別為uuid
,看就知道是為了保證獨立性才放入主鍵的。熟悉RDB的朋友一定會想,有shopping_id就可以滿足唯一性了,要cust_id
跟date
幹嘛勒??這就是cassandra跟RDB最大的差異之一:主鍵的設計除了提供唯一性
,還必須符合查詢的需求
。這邊就不多說了,還是回到我們的Spark主題吧
測試新表:
cqlsh:test> insert into shopping_info (cust_id , date , shopping_id , item , price ) VALUES ( '1','2017-01-02',uuid(),'book',30.5);
cqlsh:test> insert into shopping_info (cust_id , date , shopping_id , item , price ) VALUES ( '2','2017-01-02',uuid(),'toys',40.87) ;
cqlsh:test> insert into shopping_info (cust_id , date , shopping_id , item , price ) VALUES ( '2','2017-01-03',uuid(),'cake',10) ;
cqlsh:test> SELECT * FROM shopping_info ;
cust_id | date | shopping_id | item | price
---------+------------+--------------------------+------+-------
2 | 2017-01-02 | de23a28b-c2ad-4018-..... | toys | 40.87
2 | 2017-01-03 | 805ea507-cea9-468e-..... | cake | 10
1 | 2017-01-02 | ea68f840-fc45-43ac-..... | book | 30.5
OK,沒有問題了。
[Snippet.69] 直接連接兩張PartitionKey相同的表
如果兩張表的Partition Key類似,那可以簡單透過joinWithCassandraTable
直接連接。
val internalJoin = sc.cassandraTable("test",
"customer_info").joinWithCassandraTable("test", "shopping_info")
internalJoin.collect.foreach(println)
//Output:
(CassandraRow{cust_id: 2, name: doris},CassandraRow{cust_id: 2, date: 2017-01-02, shipping_id: de23a28b-c2ad-4018-b52b-a459b1b670ac, item: toys, price: 40.87})
(CassandraRow{cust_id: 2, name: doris},CassandraRow{cust_id: 2, date: 2017-01-03, shipping_id: 805ea507-cea9-468e-9edc-e91512514130, item: cake, price: 10.0})
(CassandraRow{cust_id: 1, name: joe},CassandraRow{cust_id: 1, date: 2017-01-02, shipping_id: ea68f840-fc45-43ac-8775-3e52d8c73ebc, item: book, price: 30.5})
[Snippet.70] Join with Cassandra Table with Regular RDD
那普通RDD可以跟Cassandra表Join嗎?一樣可以,而且也是用joinWithCassandraTable
即可,前提是那個普通RDD是PairRDD
,而且K
與表的partitionKey欄位類似
:
val ids = sc.parallelize(List((1, "joe"), (2, "doris")))
val localJoin = ids.joinWithCassandraTable("test", "shopping_info");
localJoin.collect.foreach(println)
//Output:
((1,joe),CassandraRow{cust_id: 1, date: 2017-01-02, shipping_id: ea68f840-fc45-43ac-8775-3e52d8c73ebc, item: book, price: 30.5})
((2,doris),CassandraRow{cust_id: 2, date: 2017-01-02, shipping_id: de23a28b-c2ad-4018-b52b-a459b1b670ac, item: toys, price: 40.87})
((2,doris),CassandraRow{cust_id: 2, date: 2017-01-03, shipping_id: 805ea507-cea9-468e-9edc-e91512514130, item: cake, price: 10.0})
[Snippet.71] 擷取兩張表的資料,建成RDD並手動Join
有時候兩個表要Join,但是不需兩張表的全部欄位該怎麼做
?可以把兩個表讀成CassandraRDD。這樣有時候可以獲得比較高的彈性,那該怎麼做勒?
val customers = sc.cassandraTable("test", "customer_info")
.select("cust_id", "name") ①
.as((c: String, n: String) => (c, n)) ②
val records = sc.cassandraTable("test", "shopping_info")
.select("cust_id", "date", "shipping_id", "price")
.as((c: String, d: java.util.Date, s: java.util.UUID, p: Double) => (c, (d, s, p)))
customers.join(records).collect().foreach(println)
//Output:
(1,(joe,(Mon Jan 02 00:00:00 CST 2017,ea68f840-fc45-43ac-8775-3e52d8c73ebc,30.5)))
(2,(doris,(Mon Jan 02 00:00:00 CST 2017,de23a28b-c2ad-4018-b52b-a459b1b670ac,40.8)))
(2,(doris,(Tue Jan 03 00:00:00 CST 2017,805ea507-cea9-468e-9edc-e91512514130,10.0))
從上面結果可以得知結果是巢狀的結構,但我們可以透過許多方式攤平(例如flatmap,或是用map搭配case自己控制):
[Snippet.72]將兩個表的Join結果攤平
customers.join(records)
.map {
case (customer, ((name), (date, price))) => (customer, name, date, price)
}
.collect
.foreach(println)
//Output:
(1,joe,Mon Jan 02 00:00:00 CST 2017,30.5,ea68f840-fc45-43ac-8775-3e52d8c73ebc)
(2,doris,Mon Jan 02 00:00:00 CST 2017,40.87,de23a28b-c2ad-4018-b52b-a459b1b670ac)
(2,doris,Tue Jan 03 00:00:00 CST 2017,10.0,805ea507-cea9-468e-9edc-e91512514130)
[Snippet.73]將Join結果寫回表
有兩種情境:
先建立一張表當作以存在的表,用來存放Join結果
cqlsh:test> CREATE TABLE customer_with_shopping (
cust_id text,
name text,
date date,
price double,
shopping_id uuid,
PRIMARY KEY(cust_id,shopping_id) );
接著就是之前的map
操作與saveToCassandra
結合!
customers.join(records)
.map {
case (customer, ((name), (date, shopping_id, price))) =>
(customer, name, date, price, shopping_id)
}
.saveToCassandra("test","customer_with_shopping",
SomeColumns("cust_id", "name","date","price","shopping_id"))
用cqlsh檢查一下是否有寫入:
cqlsh:test> SELECT * from customer_with_shopping;
cust_id | shopping_id | date | name | price
---------+------------------------+------------+-------+-------
2 | 805ea507-cea9-468e-... | 2017-01-03 | doris | 10
2 | de23a28b-c2ad-4018-... | 2017-01-02 | doris | 40.87
1 | ea68f840-fc45-43ac-... | 2017-01-02 | joe | 30.5
這個長篇就到此劃下句點了。當然Cassandra本身就是一個坑(?),只能以Client端 on Spark說明一些用法。還有許多議題沒有涉及到,例如調校與一些partition相關操作,就留給各位探索囉
想要我的寶藏嗎?想要的話可以全部給你,去找吧!我把所有的財寶都放在那裡了~!
您好,這邊有幾個問題想請教您
第一個問題是關於sbt的操作.
你用sbt的部分是建立一個jar檔然後引用嗎?build.sbt建立之後,接著要怎麼進行下去?請問是使用sbt package建成jar檔嗎?
如果不是建立成jar檔的話,能夠講解一下這段sbt是在陳述什麼嗎?謝謝,sbt這方面一直不是很明瞭(對maven和ivy也是...)
第二個問題是關於一些error的解決
目前我嘗試的方式是先去官方所提供的載點做對應版的jar檔下載( http://spark-packages.org/package/datastax/spark-cassandra-connector )
接著以spark-shell --jars XXX 的方式啟用
但是到了saveToCassandra這一步都會跳這個error
ERROR Executor: Exception in task 1.0 in stage 4.0 (TID 8)
我有找到可能的答案如下
Just using a single jar is not enough. Use the --packages command to pull down the jar and all of it's dependencies. https://github.com/datastax/spark-cassandra-connector/blob/master/doc/13_spark_shell.md
若使用的是下面的語法就能成功寫入資料進cassandra
c.withSessionDo ( session => session.execute("CREATE TABLE test.fun (k int PRIMARY KEY, v int)"))
若是採用sbt打包成jar檔然後這樣引用就不會有這樣的錯誤嗎?
我的環境是ubuntu16.04/cassandra3.9/scala2.11.1/spark2.0.1
Q1: Sbt建立後最常見的作法應該是用Sbt Assembly包成Jar,再透過spark-submit送到Cluster上去。Sbt那段僅是說明在Spark中需要整合Cassandra所需的lib
Q2: 你已經回答出答案了..用packge取代jar